package com.dongdongshop.mq;

import com.alibaba.dubbo.config.annotation.Reference;
import com.alibaba.fastjson.JSONObject;
import com.dongdongshop.pojo.TbOrder;
import com.dongdongshop.pojo.TbUserScore;
import com.dongdongshop.service.OrderService;
import com.dongdongshop.service.UserScoreService;
import com.dongdongshop.service.impl.UserScoreServiceImpl;
import net.sf.jsqlparser.expression.LongValue;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.math.BigDecimal;
import java.util.Date;
import java.util.List;

@Component
@RocketMQMessageListener( topic = "tx-topic", consumerGroup = "tx-consumer")
public class MQConsumerLinster implements RocketMQListener, RocketMQPushConsumerLifecycleListener {

    @Autowired
    private UserScoreServiceImpl userScoreService;

    @Autowired
    private RedisTemplate redisTemplate;

    @Reference
    private OrderService orderService;

    @Override
    public void onMessage(Object o) {
        TbUserScore userScore = (TbUserScore)o;
        System.out.println(userScore);
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
        defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //获取Message
                MessageExt messageExt = list.get(0);
                //获取信息id
                String msgId = messageExt.getMsgId();
                System.out.println("消息ID>>>" + msgId);
                //获取信息重试次数
                int reconsumeTimes = messageExt.getReconsumeTimes();
                System.out.println("消息重试次数>>>" + reconsumeTimes);
                try {
                    //获取消息内容
                    byte[] body = messageExt.getBody();
                    String message = new String(body);
//                    TbUserScore userScore = JSONObject.parseObject(message, TbUserScore.class);
//                    userScoreService.addUserScore(userScore);
                    List<TbOrder> orderList = orderService.selectOrderList(message);
                    TbOrder order = orderList.get(0);
                    String userId = order.getUserId();
                    Date date = new Date();
                    TbUserScore userScore = new TbUserScore(userId, 2000L, date, date);
                    userScoreService.addUserScore(userScore);
                    //消费之前先判断此消息是否已被消费过
                    Boolean key = redisTemplate.boundHashOps("txmessage").hasKey(msgId);
                    if(!key){
                        //缓存中无key,说明没有消费过
                        System.out.println("消息内容>>>" + message);
                        System.out.println("消息主题>>>" + messageExt.getTopic());
                        redisTemplate.boundHashOps("txmessage").put(msgId, message);
                    }
                    //缓存中有key,说明消费过,不在消费
                }catch (Exception e){
                    if(reconsumeTimes > 2){
                        //重试次数大于2 进入死信队列
                        consumeConcurrentlyContext.setDelayLevelWhenNextConsume(-1);
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                    //消费异常,进行重试
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                //消费成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
    }
}
